Java 并发(一) - 基础知识

Posted by LinYaoTian on 2018-09-02

线程安全性

要编写线程安全的代码,其核心在于要对状态访问操作进行管理,特别是对共享的(Shared)可变的(Mutable)状态的访问。

“共享”意味着变量可以由多个线程同时访问,而“可变”则意味着变量的值在其生命周期内可以发生变化。

如果当多个线程访问同一个可变的状态变量时没有使用合适的同步,那么程序就会出现错误。有三种方式可以修复这个问题:

  • 不在线程之间共享该状态变量
  • 将状态变量修改为不可变的变量
  • 在访问状态变量时使用同步

什么是线程安全性

定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

在线程安全类中封装了必要的同步机制,因此客户端无须进一步采取同步措施。

无状态对象:既不包含任何域,也不包含任何对其他类中域的引用。

无状态变量一定是线程安全的。

原子性

原子性从字面意思上看就是,要么全部都做,要么全部都不做。具有原子性的操作是线程安全的,例如 i = 1;不具有原子性的操作不是线程安全的,例如 i++。因为 i++ 实际上分为三步,读取i,将值加 1,写回 i

竞态条件

当某个计算结果的正确性取决于多个线程的交替执行时序时,那么就会发生竞态条件

竞态条件和原子性相关,或者说,之所以代码会发生竞态条件,就是因为代码不是以原子方式操作的,而是一种复合操作。

例如:

1
2
3
4
5
6
7
8
9
public class A {
private static A instance = null;
public static A getInstance(){
if(instance == null){
instance = new A();
}
return instance;
}
}

这里会存在竞态条件(先检查后执行)。假设线程 B1B2 同时执行 getInstanceB1 看到instance 为空,因此他会进入到 new A() 的操作创建 A 的实例。B2 同时也要判断 instance 是否为空,此时的 instance 是否为空,却取决于不可预测的时序(包括线程的调度方式),以及B1 要花多少时间来 new 一个 A 的实例。如果 B1new 操作时,轮到 B2 线程被调度,那么此时 B2 判断的 instance 也为空,因此到最后,会出现两个 A 的实例。

同理对于 i++ 也一样存在竞态条件(读取—修改—写入)。

解决:

  • 对于这两种竞态条件,我们要避免它,就要保证它是原子方式执行,即在某个线程修改该变量时,通过某种方式防止其他线程使用这个变量。
  • i++ 这种情况可以使用 concurrent.atomic 包实现原子操作,而先检查后执行这种竞态条件则可以通过加锁来实现同步。

concurrent.atomic 包实现原子操作

原子操作类主要用于高并发环境下的程序处理,这些处理主要有:
- 基本类:AtomicIntegerAtomicLongAtomicBoolean
- 引用类型:AtomicReference
- 数组类型:AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray

像上面提到的递增,就可以用原子类来实现

加锁机制

要保证状态的一致性,就需要在单个原子操作中更新所有相关的状态变量。

内置锁

Java 提供了一种内置的锁机制来支持原子性:同步代码块(Synchronized),同步代码块包括两部分:一个作为锁的对象引用,一个作为由这个锁保护的代码块。

对于前面先检查后执行的竞态条件,可以通过加锁来实现线程安全

1
2
3
4
5
6
7
8
9
public class A {
private static A instance = null;
public static synchronized A getInstance(){
if(instance == null){
instance = new A();
}
return instance;
}
}

谈到锁,就要谈到双重加锁机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class A {
private static A instance = null;
private byte[] lock = new byte[0];
public static A getInstance(){
if(instance == null){
synchronized (lock){ //1
if(instance == null){ //2
instance = new A();//3
}
}
}
return instance;
}
}

双重加锁的理念是这样的:

  1. 线程 A 进入 getInstance() 方法。
  2. 由于 instancenull,线程 A 在 //1 处进入 synchronized 块。
  3. 线程 A 被线程 B 预占。
  4. 线程 B 进入 getInstance() 方法。
  5. 由于 instance 仍旧为 null,线程 B 试图获取 //1 处的锁。然而,由于线程 A 持有该锁,线程 B 在 //1 处阻塞。
  6. 线程 B 被线程 A 预占。
  7. 线程 A 执行,由于在 //2 处实例仍旧为 null,线程 A 还创建一个 Singleton 对象并将其引用赋值给 instance
  8. 线程 A 退出 synchronized 块并从 getInstance() 方法返回实例。
  9. 线程 A 被线程 B 预占。
  10. 线程 B 获取 //1 处的锁并检查 instance 是否为 null
  11. 由于 instance 是非 null 的,并没有创建第二个 Singleton 对象,由线程 A 创建的对象被返回。

按理论来说,这是完美的。双重检查锁定的问题是:并不能保证它会在单处理器或多处理器计算机上顺利运行。双重检查锁定失败的问题并不归咎于 JVM 中的实现 bug,而是归咎于 Java 平台内存模型。内存模型允许所谓的“无序写入”,这也是这些习语失败的一个主要原因。

所以不要使用双重锁定!

内置锁的特性

  1. 自动获得和释放

    每个 Java 对象都可以隐式地扮演一个用于同步的锁的角色,这些内置的锁被称为内部锁(Intrinsic Lock)监视器锁(Monitor Lock),执行线程进入 synchronized 块之前自动获得锁,而无论是正常退出还是抛出异常,线程都会自动释放锁。因此获得内部锁的唯一途径是进入这个内部锁保护的同步块或方法。

  2. 互斥性

    内部锁在 Java 中扮演了互斥锁的角色,即至多只有一个线程可以拥有锁,没有获取到锁的线程只能等待或阻塞直到锁被释放,因此同步块可以线程安全地原子执行。

  3. 可重入性:

    可重入是指对于同一个线程,它可以重新获得已有它占用的锁。

    可重入性意味着锁的请求是基于”每线程”而不是基于”每调用”,它是通过为锁关联一个请求计数器和一个占有它的线程来实现。

    可重入性方便了锁行为的封装,简化了面向对象并发代码的开发,可以防止类继承引起的死锁,例子如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class Widget {  
    public synchronized void doSomething(){
    ......
    }
    }

    public class LoggingWidget extends Widget {
    public synchronized void doSomething(){
    System.out.println(toString() + “: calling doSomething”);
    super.doSomething();
    }
    }

    子类 LoggingWidget 覆盖了父类 Widgetsynchronized 类型的 doSomething() 方法,并调用了父类的中的同步方法,因此子类 LoggingWidget 和父类 Widget 在调用 doSomething() 方法之前都会先获取 Widget 的锁,若内部锁没有可重入性,则 super.doSomething() 的调用就会因为无法获得锁而被死锁。

用锁来保护状态

一种常见的错误认识:只有在写入共享变量时才需要使用同步。

对于可能被多个线程同时访问的可变状态变量,在访问它时都需要持有一个锁,在这种情况下,我们称状态变量是由这个锁保护的。

每个共享的和可变的变量都应该只由一个锁来保护,从而使维护人员知道是哪一个锁。

对于每个包含多个变量的的不变性条件,其中涉及的所有变量都需要由同一个锁来保护。

当执行时间较长的计算或者可能无法快速完成的操作时,一定不要持有锁。

对象的共享

发布

发布一个对象的意思是:使对象能够在当前作用域之外的代码中使用。

例如:

  1. 将一个对象存储到其他代码可以访问到的地方。
  2. 在一个非私有的方法中放回该对象。
  3. 将该对象传递到其他类的方法中。
1
2
3
4
public static Set<Secret> secrets;
public void init(){
secrets = new HashSet<Secret>();
}

当发布某个对象时,可能会间接地发布其他对象。例如如果将一个 Secret 对象添加到集合 secrets 中,那么发布 secrets 的同时,也会发布 Secret 对象,因为任何代码都可以遍历这个集合,并获得对 Secret 对象的引用。

逸出

当某个不应该发布的对象被发布时,这种情况就是逸出。

对象逸出会导致对象内部状态暴露,可能会危及封装性,使程序难以维持稳定;若发布尚未构造完成的对象可能会危及安全问题。

最常见的逸出就是 This 引用在构造时逸出,导致 This 引用逸出的常见错误有:

  1. 在构造函数中启动线程:

    当对象在构造函数中显式或者隐式创建线程时,This 引用几乎总是被新线程共享,于是新的线程在所属对象完成构造之前就能看见它。
    避免构造函数中启动线程引起的 This 引用逸出的方法是不要在构造函数中启动新线程,取而代之的是在其他初始化或启动方法中启动对象拥有的线程。

  2. 在构造方法中调用可覆盖的实例方法:

    在构造方法中调用那些既不是 private 也不是 final 的可被子类覆盖的实例方法时,同样导致 this 引用逸出。
    避免此类错误的方法是不要在父类构造器中调用被子类覆盖的方法。

  3. 在构造方法中创建内部类:

    在构造方法中创建内部类实例时,内部类的实例包含了对封装实例的隐含引用,可能导致隐式 this 逸出,例子如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class ThisEscape {  
    public ThisEscape(EventSource source) {
    source.registerListener(new EventListener() {
    public void onEvent(Event e) {
    doSomething(e);
    }
    });
    }
    }

    上述例子中的 this 逸出可以使用工厂方法来避免,例子如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class SafeListener {  
    private final EventListener listener;

    private SafeListener(){
    listener = new EventListener(){
    public void onEvent(Event e){
    doSomething(e);
    }
    );
    }

    public static SafeListener newInstance(EventSource source) {
    SafeListener safe = new SafeListener();
    source.registerListener(safe.listener);
    return safe;
    }
    }

线程封闭

当访问共享的可变数据时,通常需要同步。一种避免使用同步的方式就是不同享数据,这叫做线程封闭。Java 提供了一些机制来维持线程封闭性,例如局部变量和 ThreadLocal 类。
线程封闭技术的一个常见应用是 JDBCConnection 对象。JDBC 规范不要求 Connection 对象时线程安全的,而要求连接池是线程安全的。线程聪哥线程池中获得一个 Connection 对象,并且用该对象来处理请求,使用完之后再返回给连接池。由于大多数请求(例如 Servlet 请求和 EJB)都是单个线程采用同步的方式来处理,并且在Connection 对象返回前,连接池不会再把它分配给其它线程,因此这种连接在处理请求时,把 Connection 对象封闭在线程中。

栈封闭(局部变量)

栈封闭是线程封闭的一种特例,只能通过局部变量才可以访问对象,局部变量使对象限制在执行线程中,存在于执行线程栈,其他线程无法访问到这个栈,从而确保了线程安全。(每一个线程都有一个工作内存,工作内存中包含有栈,局部的基本类型变量是处于栈中的,引用类型的引用处于处于栈中,而引用指向的对象处于堆中)

栈的限制如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int loadTheArk(Collection<Animal> candidates){  
SortedSet<Animal> animals;
int numPairs = 0;
Animal candidate = null;

//animals被限制在本地方法栈中
animals = new TreeSet<Animal>(new SpeciesGenderComparator());
animals.addAll(candidates);
for(Animal a : animals){
if(candidate == null || !candidate.isPotentialMate(a)){
candidate = a;
}else{
ark.load(new AnimalPair(candidate, a));
++numPairs;
candidate = null;
}
}
return numPairs;
}

指向 TreeSet 对象的唯一引用保存在 animals 中,而 animals 这个引用被封闭在局部变量中,因此封闭在线程本身的工作内存中,其它线程不能访问。如果发布了对集合的引用,那么线程的封闭性将被破坏,并且导致对象 animals 的逸出。

TheadLocal 类

ThreadLocal 线程本地变量是一种规范化的维护线程限制的方式,它允许将每个线程与持有数值的对象关联在一起,为每个使用它的线程维护一份单独的拷贝。ThreadLocal 提供了 setget 访问器,get 总是返回由当前线程通过 set 设置的最新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static ThreadLocal<Integer> num = new ThreadLocal<Integer>(){
@Override
protected Integer initialValue() {
return 0;
}
};

public ThreadLocal<Integer> getThreadLocal(){
return num;
}

public int getNextNum(){
num.set(num.get() + 1);
return num.get();
}

我们来看看 ThreadLocal 是如何做到对每一个线程都做到独立的副本的。

1
2
3
4
5
6
7
8
9
10
//源码
public void set(T value){
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map != null){
map.set(this, value);
}else{
createMap(t, value);
}
}

set 方法中我们可以看到,ThreadLocalMap 这个应该是关键,把 ThreadLocalMap 看成一个 mapThreadLocalMap是通过 getMap(t) 方法获得的,传入的 t 是当前线程,也就是说,ThreadLocalMap是与各自线程绑定的。此后,ThreadLocalMap 通过 set 方法,把当前的 ThreadLocal 作为 key,传入的值作为 value 保存在 ThreadLocalMap 中。ThreadLocal 通过操作每一个线程特有的 ThreadLocalMap 对象,从而实现了变量访问在不同线程中的隔离。

不可变对象

如果一个对象在创建后其状态就不能被修改,那么这个对象就称为不可变对象。

不可变对象需要满足下面条件:

  1. 对象本身是 final 的(避免被子类化),声明属性为 privatefinal
  2. 不可变对象的状态在创建后就不能再改变,不要提供任何可以修改对象状态的方法。(不仅仅是set方法, 还有任何其它可以改变状态的方法,每次对他们的改变都是产生了新的不可变对象的对象)
  3. 不可变对象能被正确地创建(在创建过程中没有发生 this 引用逸出)。
  4. 如果类有任何可变对象属性, 那么当它们在类和类的调用者间传递的时候必须被保护性拷贝。

不可变对象一定是线程安全的,不需要任何同步或锁的机制就可以保证安全地在多线程之间共享。

对象的组合

通过一些组合模式来实现线程安全。

设计线程安全的类

设计线程安全类的三个基本要素:

  1. 找出构成对象状态的所有变量。
  2. 招数约束状态变量的不变性条件。
  3. 建立对象状态的并发访问管理策略。

要分析对象的状态,首先从对象的域开始。如果对象所有的域都是基本类型的变量,那么这些域将构成对象的全部状态;如果对象的域中引用了其他对象,那么该对象的状态将包含被引出的对象的域。

实例封闭

当一个对象被封装到另一个对象中,能够访问到被封装对象的所有代码路径都是已知的。通过将封闭机制和合适的加锁策略结合,可以确保以线程安全的方式来使用非线程安全的对象。

1
2
3
4
5
6
7
8
9
10
11
public class PersonSet{  
private final Set<Person> mySet = new HashSet<Person>();

public sychronized void addPersion(Person p) {
mySet.add(p)
}

public sychronized boolean containsPerson(Person p) {
return mySet.contains(p);
}
}

解析:虽然 HashSet 并非线程安全,但是 mySet 是私有的。唯一能访问 mySet 的代码是 addPersion()containsPersion()。在执行上他们都要获得 PersionSet 上的锁,PersionSet 的状态完全又由它的内置锁保护。所以 PersionSet 是一个线程安全的类。

Java 平台的类库有很多实例封装的例子。比如一些基本的容器并非线程安全的,如 ArrayListHashMap。类库提供的包装器方法:Collections.synchronized(list)Collections.synchronizedMap(map) 等,只要这些包装器对象拥有对包装容器对象的唯一引用(即把容器对象封装在包装器中),非线程安全的类就可以在多线程中使用了。

线程安全性的委托

1
2
3
4
5
6
7
8
class Counter {

private AtomicInteger count = new AtomicInteger(0);

private int inc(){
return count.incrementAndGet();
}
}

对于 Counter 来说,由于 Counter 只有一个域就是 AtomicInteger,而 AtomicInteger 又是线程安全的,所以很容易知道 Counter 是线程安全的。Counter 把它的线程安全性交给了 AtomicInteger 来决定,也就是委托给了 AtomicInteger 来保证。

但是,当委托的状态变量超过 1 个时,就要看情况而定了。要看委托的状态变量之间是否有某种联系,如果委托的状态域是彼此独立的,那么不会影响组合的类的线程安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class ListenerManager {

private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<KeyListener>();
private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<MouseListener>();

public void addKeyListener(KeyListener e){
keyListeners.add(e);
}

public void addMouseListener(MouseListener e){
mouseListeners.add(e);
}

public void removeKeyListener(KeyListener e){
keyListeners.remove(e);
}

public void removeMouseListener(MouseListener e){
mouseListeners.remove(e);
}
}

对于 ListenerManager 来说,它把它的线程安全委托给了 keyListenersmouseListeners,而这两个状态变量在类中不存在任何的耦合关系,因此它们组合而成的类也是线程安全的。(CopyOnWriteArrayList 是一个线程安全的链表)

当委托的多个状态存在耦合关系时,委托可能会失效!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class NumberRange {

private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);

public void setLower(int i){
//不安全的 先检查后执行
if(i > upper.get()){
throw new RuntimeException("最小值不能比最大值大");
}
lower.set(i);
}

public void setUpper(int i){
//不安全的 先检查后执行
if(i < lower.get()){
throw new RuntimeException("最大值不能比最小值小");
}
upper.set(i);
}
}

虽然 lowerupper 都是原子操作,但是由于在整个类中存在一个不变形的条件——lower <= uppersetLower()setUpper() 都是“先检查后执行”的不安全操作,没有采取足够的加锁机制来保证这两个方法是一个原子操作(前面提到了,非原子操作在多线程环境下是线程不安全的)。

如果一个类,仅仅靠委托状态不足以维持线程的安全性,这种情况下,这个类必须提供自己的加锁机制保证这些复合操作是原子操作!

客户端加锁机制实现线程安全

1
2
3
4
5
6
7
8
9
10
11
class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
....
public synchronized boolean putIfAbsent(E x){
boolean absent = !list.contains(x);
if(absent){
list.add(x);
}
return absent;
}
}

这种通过扩展类,来实现我们想要的功能——如果没有,则添加。我们很自然想到,在多线程环境下,putIfAbsent 不是一个原子操作,因此我们会通过加锁来实现线程同步,那么真的实现了线程安全吗?

但实际上,这里用了两个不同的锁。

list 是一个线程安全的链表,list 中使用的锁的对象是 List 本身,而 putIfAbsent 中加的锁的对象是 ListHelper,使用了不同的锁,意味着 ListHelper 相对于 list 来说,并不是原子操作,也就有可能一个线程调用putIfAbsent 操作时,另一个线程调用其他 list 的其它方法。

下面是正确的加锁方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());

public boolean putIfAbsent(E x){
synchronized(list){
boolean absent = !list.contains(x);
if(absent){
list.add(x);
}
return absent;
}
}
}

但是这种扩展类的方式仍然不值得推荐,因为会破坏同步策略的封装性。

组合

为一个已有的类拓展原子操作时,有一种更好的方法:组合(Composition)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//推荐
public class ImprovedList<T> implements List<T> {
private final List<T> list;
public ImprovedList(List<T> list) {
//传递之后客户端不再直接使用 list,而通过 ImprovedList 的实例操作 list
this.list = list;
}
public synchronized boolean putIfAbsent(T x) {
boolean absent = !list.contains(x);
if (absent) {
list.add(x);
}
return absent;
}
//...按照类似的方式将 ImprovedList 的其他方法实现委托给 List,
}

Java类库的基础构建模块

同步容器类

同步容器类包括 VectorHashTable,以及使用 Collections.synchronizedXxx(例如Collections.synchronizedList(new ArrayList< T>()))等工厂方法创建的同步类。

这些类实现线程安全的方式是:把他们的状态封装起来,并对每一个共有方法都进行同步,使得每次只有一个线程能访问容器的状态。(从这个描述,可以看出,在高并发情况下可能效率是一个问题)

同步容器类存在的问题: 虽然同步容器类是线程安全的,对于 VectorHashTable,在类中提供的操作都是原子操作的,在多线程环境下就可以放心使用 VectorHashTable 的方法。但是在一些复合操作上还是要加锁来实现同步,例如:迭代,条件运算(若不存在则添加)。

例如:

1
2
3
4
public static Object getLast(Vector list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}

对于这种非原子操作,必须加锁达到线程同步

1
2
3
4
5
6
public static Object getLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}

并发容器类

同步容器类将所有对容器状态的访问都加了锁,以实现线程安全,代价就是严重降低了并发性,当多个线程竞争容器的锁时,吞吐量严重降低,而为了改善同步容器的性能,Java 针对多个线程并发访问提供了并发容器类

例如:

  • ConcurrentMap,用来替代同步基于散列的 Map
  • CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的 List
  • QueueBlockingQueue

ConcurrentHashMap

ConcurrentMap 使用了一个更加细化的锁机制,名叫分离锁。这个机制允许更深层次的共享访问,任意数量的读取线程可以并发的访问 Map,读者和写者也可以并发的访问,并且有限数量的写线程还可以并发修改 Map, 结果是为并发带来更高的吞吐量,同时几乎没有损失单线程访问的性能。

ConcurrentMap 接口加入了对常见复合操作的支持, 比如”缺少即加入(putIfAbsent)”、替换和条件删除,,而且这些操作都是原子操作。

1
2
3
4
5
6
public interface ConcurrentMap<K, V> extends Map<K, V> {
V putIfAbsent(K key, V value);
boolean remove(Object key, Object value);
boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);
}

由于 ConcurrentHashMap 不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。(外部的锁与内部的分段锁不同,无法保证独占访问)

CopyOnWriteArrayList

CopyOnWriteArrayList 用于替代同步 List,并且在迭代期间不需要对容器进行加锁或复制。多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰,“写入时复制”容器的迭代器不会抛出 ConcurrentModificationException ,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。

在每次 CopyOnWriteArrayList 修改时都需要对底层数组进行一次复制,因此当容器比较大时,不是很合适,只有当容器迭代操作的品路远远高于对容器的修改的频率,写入即复制容器是一个合适的选择。

Queue

BlockingQueue 提供了可阻塞的 puttake 方法,它们与可定时的 offerpoll 是等价的。如果 Queue 已经满了, put 方法会被阻塞直到有空间可用; 如果 queue 是空的, 那么 take 方法会被阻塞, 直到有元素可用。queue的长度可以有限, 也可以无限。

可以使用 BlockingQueueoffer 方法来处理这样一种场景:如果条目不能被加入到队列里, 它会返回一个失败状态。这样可以创建更多灵活的策略来处理超负荷工作, 比如减轻负载, 序列化剩余工作条目并写入硬盘, 减少生产者线程, 或者其他方法儿子生产者线程。

在类库中包含了 BlockingQueue 的多种实现,其中,LinkedBlockingQueueArrayBlockingQueueFIFO 队列,二者分别与 LinkedListArrayList 相似,但比同步的 List 拥有更好的并发性能。PriorityBlockingQueue 是一个按优先级排序的队列,而不是 FIFO

SynchronousQueue 是一种 BlockingQueue 的实现,维护了一个没有存储空间的 Queue,如果用洗盘子来比喻的话,可以认为没有盘子架,直接将洗好的盘子放到烘干机中,因为是直接交付,这样可以减少数据在生产者和消费者之间移动的延迟。因为 SynchronousQueue 没有存储能力,所以除非另一个线程已经准备好移交工作,否则 puttake 会一直阻止,这类队列只有在消费者充足的比较合适,它们总是为下一个任务做好准备。

Deque

DequeBlockingDeque ,它们分别对 QueueBlockingQueue 进行了拓展。Deque 释义双端队列,允许高效地在头和尾进行插入和删除,其实现有 ArrayDequeLinkedBlockingDeque

双端队列适合一种窃取的工作模式,其原理是每一个消费者都有一个自己的双端列表,如果一个消费者完成了自己的双端队列中的全部工作,它可以偷取其他消费者的双端队列中末尾的任务,由于消费者不会共享一个队列,因此相对于传统的生产者-消费者模式具有更高的可伸缩性,而且即使一个工作者要访问另一个队列,也是从末尾截取,这样可以进一步降低对队列的争夺。

阻塞方法与中断方法

当线程阻塞时,它通常被挂起,并处于某种阻塞状态。阻塞操作与执行时间很长的普通操作差别在于:被阻塞的线程必须等待某个不受它控制的事件发生之后才能继续执行,例如等待 I/O 操作完成,等待某个锁可用,或者等待外部的计算结束。

中断是一种协作机制,一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。当线程 A 中断 B 时,A 仅仅是要求 B 在某个可以暂停的地方停止正在执行的操作——前提是如果线程 B 愿意停下来。

当代码中调用一个将抛出 InterruptedException 异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的相应。对于库代码来说,有两种基本的选择:

  1. 传递 InterruptedException:只需把 InterruptedException 传递给方法的调用者。传递 InterruptedException 的方法有,根本不捕获该异常,或捕获该异常,然后执行某种简单的操作清理工作后再次抛出这个异常。

  2. 恢复中断:有时候不能抛出异常,例如代码是在 Runnable 中,此时必须捕获 InterruptedException ,并通过调用当前线程上的 interrupt 方法恢复中断。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class TaskRunnable implements Runnable{
    BlockingQueue<Task> queue;
    ...
    public void run(){
    try{
    processTask(queue.take());
    }catch(InterruptedException e){
    //恢复被中断的状态
    Thread.currentThread().interrupt();
    }
    }
    }

同步工具类

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barried)以及闭锁(Latch),如果不能满足自己的需求,还能自己定制同步工具类。

闭锁(Latch)

闭锁可以延迟线程的进度直到其到达终止状态,一个闭锁工作起来就像一道大门:直到闭锁达到终点状态之前, 门一直是关闭的, 没有线程通过, 在终点状态到来的时候, 这扇门会打开并允许所有的线程通过。一旦闭锁到达了终点状态, 它就不能再改变状态了, 所以它会永远保打开状态。

闭锁的一些示例:

  1. 确保某个计算在其需要的所有其他服务都被初始化之后才继续执行。
  2. 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。
  3. 等待之后某个操作的所有参与者(例如吃鸡里面的玩家)都准备就绪之后再继续执行。

CountDownLatch 是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件发生。它的状态包括一个计数器, 初始化为一个正数, 用来表现需要等待的事件数。countDown() 方法对计数器做减操作, 表示一个事件已经发生了, 而 await() 方法等待计数器达到零, 这表示所有需要等待的时间都已经发生。如果计数器入口时值为非零, await() 会一直阻塞知道计数器为零, 或者等待线程中断以及超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TestHarness {  
public long timeTasks(int n, final Runnable task) throws Exception {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await(); // 所有线程运行到此被暂停, 等待一起被执行
try {
task.run();
} finally {
endGate.countDown();
}
} catch (Exception e) {
}
};
};
t.start();
}

long start = System.nanoTime();
startGate.countDown(); // 启动所有被暂停的线程
endGate.await(); // 等待所有线程执行完
long end = System.nanoTime();
return end - start;
}

public static void main(String[] args) {
TestHarness th = new TestHarness();
Runnable r = new Runnable() {
public void run() {
System.out.println("running");
}
};
try {
th.timeTasks(10, r);
} catch (Exception e) {
e.printStackTrace();
}
}

startGate 是一个开始门,endGate 是结束门。startGate 初始为 1,而 endGate 初始为工作线程的数量。

FutureTask

FutureTask 的计算是通过 Callable 实现的,它等价于一个可以携带结果的 Runnable ,并且有三个状态:等待运行、正在运行和与运行完成。

运行完成有三种情况:

正常结束、取消结束和异常结束。一旦 FutureTask 进入完全状态,它会永远停止在这个状态上。

FutureTask.get() 的行为依赖于任务的状态,如果它已经完成,get() 可以立即返回结果,否则会被阻塞,知道任务转入完成状态,然后会返回结果或者抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class PreLoader <V> {
private final FutureTask<V> future = new FutureTask<V>(new Callable<V>() {

@Override
public V call() throws Exception {
//
return loadTask();
}


});

private Thread thread = new Thread(future);

public void start(){
thread.start();
}

public V get() throws InterruptedException, ExecutionException{
return future.get();
}

private V loadTask() {
//模拟加载任务
return null;
}
}

信号量(Semaphore)

计算信号量用来控制能够同时访问某特定资源的活动的数量或者同时执行某一给定的操作的数量。

作用:技术信号量可以用来实现资源池或者给一个容器设置边界。

一个 Semaphore 管理一个有效的许可集,许可的初始量通过构造函数来指定。活动能够获得许可, 并在使用之后释放许可, 如果已经没有可用的许可了, 那么 acquire 会被阻塞,直到有可用的为止(或者直到被中断或者操作超时),release 方法向信号量返回一个许可。
一个初始值为 1 的 Semaphore 可以用来充当 mutex (互斥锁)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class BoundedHashSet <T>{  
private final Set<T> set;
private final Semaphore sem;

public BoundedHashSet(int n) {
set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(n);
}

public boolean add(T element) {
try {
sem.acquire(); //请求信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = false;
try {
result = set.add(element);
}finally {
sem.release();
}
return result;
}

public void remove(T o) {
boolean result = set.remove(o);
if (result) {
sem.release(); //返回信号量
}
}

public static void main(String[] args) {
final BoundedHashSet<String> bhs = new BoundedHashSet<String>(3);
for (int i = 0; i < 4; i++) {
Thread t = new Thread() {
@Override
public void run() {
bhs.add(System.currentTimeMillis() + "");
};
};
t.start();
}
}
}

栅栏(Barrier)

栅栏类似于闭锁, 他们能够阻塞一组线程, 直到某些事件发生, 其中栅栏与闭锁的关键不同在于, 所有线程必须同时达到关卡点, 才能继续处理。闭锁等待的是事件, 关卡等待其他线程。栅栏实现的是协议, 就像一些家庭成员指定商场中的集合地点:”我们每一个人6:00在麦当劳见, 到了以后不见不散, 之后我们再决定接下来做什么。”

CyclicBarrier 允许一个给定数量的成员多次集中在一个栅栏位置,这在并行迭代算法中非常有用, 这个算法会把一个问题拆分成一系列相互独立的子问题, 当线程到达栅栏位置时, 调用 await, await 将会阻塞所有线程到达栅栏位置,直到所有线程到达关卡点。

关卡通常用来模拟这种情况, 一个步骤的计算可以并行完成, 但是要求必须完成所有与一个步骤相关的工作后才能进入下一步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Cellular {  
private CyclicBarrier cb;
private Worker[] workers;

public Cellular() {
int count = Runtime.getRuntime().availableProcessors();
workers = new Worker[count];
for (int i = 0; i < count; i++) {
workers[i] = new Worker();
}

cb = new CyclicBarrier(count, new Runnable() {
public void run() {
System.out.println("the workers is all end...");
}
});
}

public void start() {
for (Worker worker : workers) {
new Thread(worker).start();
}
}

private class Worker implements Runnable {
public void run() {
System.out.println("working...");
try {
cb.await();//在这里线程阻塞,等待其他线程。
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
Cellular c = new Cellular();
c.start();
}
}

Exchanger 是栅栏的另外一种形式, 它是一种两步栅栏, 在栅栏交汇点会叫唤数据, 当两方进行的活动不对称时, Exchanger 是非常有用的, 比如当一个线程向缓冲写入一个数据, 这是另一个线程充当消费者使用这个数据。

总结

  1. 可变状态越少,越容易保证线程安全性
  2. 尽量将域声明为 final,除非需要它们是可变的
  3. 不可变对象一定是线程安全的
  4. 封装有利于管理复杂性
  5. 用锁来保护每一个可变变量
  6. 当保护同一个不变性条件的所有变量时,要使用同一个锁(最容易忽略)
  7. 在执行复合操作时,要持有锁